﻿using WorkFlowCore.Common.EventBus.Implements.Kafka;
using Confluent.Kafka;
using Microsoft.Extensions.DependencyInjection;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace WorkFlowCore.Common.EventBus
{
    public class KafkaEventBus : IEventBus
    {
        private IServiceProvider serviceProvider;
        private readonly KafkaEventConfig eventConfig;
        private static object objLock = new object();

        public KafkaEventBus(IServiceProvider serviceProvider, KafkaEventConfig eventConfig)
        {
            this.serviceProvider = serviceProvider;
            this.eventConfig = eventConfig;
            Console.WriteLine(eventConfig.Servers);
        }

        private static Dictionary<string, IConsumer<Ignore, string>> eventSubscribes;
        private static Dictionary<string, CancellationTokenSource> eventSubscribeCancellationTokenSources;
        static KafkaEventBus()
        {
            eventSubscribes = new Dictionary<string, IConsumer<Ignore, string>>();
            eventSubscribeCancellationTokenSources = new Dictionary<string, CancellationTokenSource>();
        }

        private void SubscribeAsync(Type eventDataType, Type handlerType)
        {

            var subscribesKey = eventDataType.FullName + handlerType.FullName;


            if (eventSubscribes.ContainsKey(subscribesKey)) return;
            //不做标记的不处理
            var topicAttr = eventDataType.GetCustomAttribute<KafkaEventTopicAttribute>();
            if (topicAttr == null) return;
            var toptic = string.IsNullOrEmpty(topicAttr.Topic)?eventDataType.FullName: topicAttr.Topic;

            var groupIdAttr = handlerType.GetCustomAttribute<KafkaEventConsumerAttribute>();
            if (groupIdAttr == null) return;
            var groupId = string.IsNullOrEmpty(groupIdAttr.GroupId)?handlerType.FullName: groupIdAttr.GroupId;
            var conf = new ConsumerConfig
            {
                GroupId = groupId,
                BootstrapServers = eventConfig.Servers,
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnableAutoCommit = false,
            };

            CancellationTokenSource cts = new CancellationTokenSource();
            var c = new ConsumerBuilder<Ignore, string>(conf).Build();
            c.Subscribe(toptic);
            lock (objLock)
            {
                if (!eventSubscribes.ContainsKey(subscribesKey))
                    eventSubscribes.Add(subscribesKey, c);
                if (!eventSubscribeCancellationTokenSources.ContainsKey(subscribesKey))
                    eventSubscribeCancellationTokenSources.Add(subscribesKey, cts);
            }

            try
            {
                while (!cts.IsCancellationRequested)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        Console.WriteLine($"Consumed message '{cr.Message}' at: '{cr.TopicPartitionOffset}'.");

                        var data = JsonConvert.DeserializeObject(cr.Message.Value, eventDataType);
                        using (var scope = serviceProvider.CreateScope())
                        {
                            var handler = scope.ServiceProvider.GetService(handlerType);
                            handlerType.GetMethod("Handle", new Type[] { eventDataType }).Invoke(handler, new object[] { data });
                        }
                        c.Commit(cr);
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine($"Error occured: {e.ToString()}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
            finally
            {
                if (c != null)
                {
                    c.Close();
                    c.Dispose();
                }
            }
        }


        private void Subscribe(Type eventDataType, Type handlerType)
        {
            Task.Run(() =>
            {
                SubscribeAsync(eventDataType, handlerType);
            });
        }



        private void Unsubscribe(Type eventDataType, Type handlerType)
        {
            var subscribesKey = eventDataType.FullName + handlerType.FullName;

            if (eventSubscribes.ContainsKey(subscribesKey)) eventSubscribes[subscribesKey].Unsubscribe();
            if (eventSubscribeCancellationTokenSources.ContainsKey(subscribesKey)) eventSubscribeCancellationTokenSources[subscribesKey].Cancel();

        }

        public void Subscribe<TData, THandler>() where THandler : IEventHandler<TData> where TData : BaseEventData
        {
            Subscribe(typeof(TData), typeof(THandler));
        }
        public void Subscribe<EventHandler>() where EventHandler : IEventHandler
        {
            var handlerType = typeof(EventHandler);
            Subscribe(handlerType);
        }

        public void Subscribe(Type handlerType)
        {
            var interfaceType = handlerType.GetInterfaces().FirstOrDefault(i => i.IsGenericType);
            if (interfaceType != null)
            {
                var dataType = interfaceType.GetGenericArguments()[0];
                Subscribe(dataType, handlerType);
            }
            else throw new Exception($"{handlerType.FullName} 需实现 {typeof(IEventHandler<>).FullName}");
        }



        /// <summary>
        /// 从 程序集注册
        /// </summary>
        /// <param name="assemblies"></param>
        public void RegistSubscriptions(params Assembly[] assemblies)
        {
            foreach (var assembly in assemblies)
            {
                var types = assembly.GetTypes().Where(t => typeof(IEventHandler).IsAssignableFrom(t));

                foreach (var type in types)
                {
                    Subscribe(type);
                }
            }
        }

        private void TriggerEvent<TData>(TData data)
        {
            if (data == null) return;
            var conf = new ProducerConfig { BootstrapServers = eventConfig.Servers };

            Action<DeliveryReport<Null, string>> handler = r =>
                Console.WriteLine(!r.Error.IsError
                    ? $"Delivered message to {r.TopicPartitionOffset}"
                    : $"Delivery Error: {r.Error.Reason}");
            //不做标记不处理
            var topicAttr = typeof(TData).GetCustomAttribute<KafkaEventTopicAttribute>();
            var toptic = topicAttr != null ? topicAttr.Topic : typeof(TData).FullName;

            using (var p = new ProducerBuilder<Null, string>(conf).Build())
            {
                p.Produce(toptic, new Message<Null, string> { Value = JsonConvert.SerializeObject(data) }, handler);

                // wait for up to 10 seconds for any inflight messages to be delivered.
                p.Flush(TimeSpan.FromSeconds(10));
            }
        }

        public void Trigger<TData>(TData data)
        {
            Task.Run(() =>
            {
                TriggerEvent(data);
            });
        }

        public void SubscribeEventHandler(Type eventDataType, Type handlerType)
        {
            Subscribe(eventDataType, handlerType);
        }

        public void SubscribeEventHandler<TData, THandler>() where THandler : IEventHandler<TData> where TData : BaseEventData
        {
            Subscribe<TData, THandler>();
        }

        public void UnsubscribeEventHandler(Type eventDataType, Type handlerType)
        {
            Unsubscribe(eventDataType, handlerType);
        }

        public void UnsubscribeEventHandler<TData, THandler>() where THandler : IEventHandler<TData> where TData : BaseEventData
        {
            Unsubscribe(typeof(TData), typeof(THandler));
        }
    }
}
